In this tutorial, we discuss multi-consumer, multi-producer lock-free queues and briefly evaluate under which situations a lock-free queue might be more suitable than a lock-based one.
We’ll be converting the fine grained queue from Lecture 5 into a lock free version. If you’re not familiar with the design, please refer to the Lecture 5 slides.
Let’s consider the operations each consumer and producer will need to do, step by step.
Producers’ enqueue implementation:
Node (new_dummy).work_node by reading
m_queue_backwork_node.work_node at new_dummy by setting
next pointer. This also simultaneously converts the work
node to a “real” job-holding node.m_queue_back so other producers know where
the new end of the queue is.We see that we need to perform three writes to publicly accessible
memory, in steps C, D, and E. If we naively make this lock free, even if
we have no “data races” in the technical C++ sense (e.g. by using
atomic everywhere), we may still have race conditions.
For example, a race condition happens when two producers attempt to place their job in the same dummy node, thus overwriting the other producer’s job.
This creates at least two potential problems:
We can solve problem (1) by choosing (E) to be the “definitive” source of truth for which producer is allowed to modify which nodes.
// queue field:
std::atomic<Node*> m_queue_back;
// doesn't work:
{
Node* new_dummy = new Node();
Node* work_node = m_queue_back;
m_queue_back = new_dummy;
// ... modify work_node
}
// works:
{
Node* new_dummy = new Node();
Node* work_node = m_queue_back.exchange(new_dummy);
// ... modify work_node
}In other words, we combine (B) and (E) into one atomic step. In doing
so, each producer will now get their own Node to work with, and we avoid them
stepping on each other’s toes.
To solve problem (2), we need to release our writes (the job) to the
consumer in a way that they can reliably acquire. To do this, we will
use release-acquire semantics that we have discussed previously. We will
use a node’s next pointer as the shared memory
location.
If we perform a release-write to work_node->next,
any consumers that read that node’s ->next will
synchronise-with the producer. If they read a nullptr, then
there isn’t a valid job (it’s a dummy).
Note that the invariant of this data structure is that the last node
(m_queue_back) always points to a dummy node. All non-dummy
nodes will have a non-null next pointer, and only the dummy
node has a null next pointer.
This results in the following algorithm:
Node (new_dummy). This
will be the queue’s new dummy node after we are done.m_queue_back.exchange(new_dummy)work_node).work_node at our new_dummy
by release-storing its next pointer. This
also simultaneously converts work_node to a “real”
job-holding node, and also serves to synchronise-with the
corresponding consumer that reads this pointer.We can show that this algorithm works to solve both problems.
Next, we can move to the consumer side of the queue. Since we are
writing a lock-free queue, we will not implement a blocking dequeue,
only a non-blocking try_pop.
At a high level, these are the steps that we need to do:
m_queue_front for
the front of the queue.next pointer.
m_queue_front to point at the next node (ie.
m_queue_front = m_queue_front->next)return old_front->job)Just like the case of producers, there are at least two potential race conditions, even if all the publicly accessible memory is atomic:
From the previous section, we know that we can solve problem (2) by synchronising the producer with us before we read the job, so we know we have to perform step (B) with an acquire memory ordering.
What about problem (1)?
We solved it previously by ensuring no two producers operate on the
same node by using an atomic exchange to perform the read
and update operations at the same time. Unfortunately, we cannot do that
so easily here:
// The following cannot be made atomic
Node* node_to_consume = m_queue_front.exchange(
m_queue_front->next // There is an entirely separate atomic load here
// ... furthermore we only want to exchange if it's not null
// so this code doesn't even do what we need it to do
);The main problem is that we need to only perform the exchange conditionally, and we need to load the new value we want to exchange from the pointer we are trying to exchange itself…
As you can see, this is not possible.
In order to solve this problem, we must use the compare-and-swap
pattern, sometimes known as the compare-exchange pattern. This is
another atomic operation, similar to the existing ones we’ve seen so far
(load, store, fetch_add,
etc.).
As its name might suggest, it first performs a comparison on the memory location, checking if the current value is the same as our expected value. If it isn’t, then we give up. If it is, then we store our new value. In either case, we get the old/current value back. All of this is done in one atomic instruction. As you might be able to tell, this is a very powerful primitive.
The best way to reason about compare-and-swap is that it presents the operation of “set X from OLD to NEW”, rather than simply “set X to NEW”. This statement doesn’t make sense if X was not OLD before setting, and so the compare-and-swap should fail.
In this way, we are performing the exchange only if nobody else has done anything to it in the meantime — ensuring that our idea of reality and the actual reality match before we actually write anything.
fetch_add with the CAS Loop PatternThe true power of compare-and-swap becomes apparent when we put it in a loop. Indeed, most algorithms that use compare-and-swap typically use them in a loop, since we (usually) have to handle the failure case (ie. when the current value is not our expected value).
This is how we might implement atomic fetch_add using
compare-and-swap:
int fetch_add(std::atomic<int>& value, int add)
{
// first, load the old value.
int old_value = value.load();
while(true)
{
// this is the new value which we want to store.
int new_value = old_value + 5;
// set value *FROM old_value* TO new_value.
// if it succeeded (returns true), we're done.
if(value.compare_exchange_weak(old_value, new_value))
return old_value;
// here, we failed -- someone else changed `value` so that
// it was no longer `old_value`.
// `old_value` is taken by reference, so we get the current
// value back even on failure (and so we don't need to load it ourselves).
// go back to the top of the loop and retry, with our refreshed `old_value`.
}
}atomic<int>::fetch_add
using a compare-and-swap loop
Of course, the real implementation of fetch_add is often
just a single instruction on most architectures.
As mentioned in the comments, one thing to keep in mind is that
compare_exchange takes in the old_value as a
non-const reference. Whether or not the CAS succeeded, we still would
have had to load value (to do the compare), and so we
conveniently get that old value returned back to us.
One peculiarity you might have noticed is the appearance of
_weak in compare_exchange — that implies the
existence of a _strong variant, right? Indeed, you are
correct.
You can expand the box below to learn more about why these two variants exist, but the key point is that the weak version can spuriously fail. That is, it can fail to perform the exchange even though the current value matched our expected value. The strong version is not susceptible to such weaknesses.
In general, the guidance is that the _strong version can
be more expensive. If you are already using CAS in a loop and each
iteration of the loop is relatively cheap, then you should probably use
the _weak version. In our case, the only update we had to
do was a simple + 5, we used _weak.
On the other hand, if updating the expected state on failure requires
heavy computation, or the CAS is not even being used in a loop, then you
should prefer the _strong version.
Read more from Raymond Chen.
Of course, we would prefer if compare_exchange couldn’t
fail spuriously, at all. One of the main reasons that a weak version
exists is due to hardware architectures.
On x86, we have the cmpxchg instruction, which is
guaranteed by the architecture to not fail spuriously. However, other
architectures use a different primitive, often called
load-linked/store-conditional (LL/SC), to implement
compare-and-swap.
The load of the current value is the “load-linked”; the CPU automatically handles “breaking” the LL/SC relationship (making the conditional store fail) if anybody updates the load-linked location, thus giving us the compare-and-swap behaviour. Unfortunately, some CPU architectures will invalidate the LL/SC even if nobody stored to the location. Common cases include:
try_pop with a CAS loopNow that we know how to use compare-and-swap loops to carry out
complex updates, we can use it to implement try_pop
correctly. The key idea behind our implementation is that if any other
thread managed to take a job while we were trying to, then we fail and
try again — instead of leaving the queue in a broken state.
This gives us the following high-level algorithm:
old_front = m_queue_front for the front of the
queue.next pointer.
nullopt.m_queue_front to point at the next node with
CAS: m_queue_front.compare_exchange_weak(old_front, next)return old_front->jobWe now translate our partial snippets into actual C++, along with the correct memory orders.
class JobQueue1
{
// alias for std::memory_order
using stdmo = std::memory_order;
// A node is a dummy node if its next pointer is set to QUEUE_END
// We use the next ptr to establish the synchronizes-with relationship
// next is in charge of "releasing" job
struct Node
{
std::atomic<Node*> next = QUEUE_END;
Job job;
};
static inline Node* const QUEUE_END = nullptr;
// Avoid false sharing with alignment
alignas(64) std::atomic<Node*> m_queue_back; // producer end
alignas(64) std::atomic<Node*> m_queue_front; // consumer end
public:
// Queue starts with a dummy node
JobQueue1() //
: m_queue_back(new Node())
, m_queue_front(m_queue_back.load(stdmo::relaxed))
{
}
~JobQueue1()
{
// Assumption: no other threads are accessing the job queue
Node* cur_node = m_queue_front.load(stdmo::relaxed);
while(cur_node != QUEUE_END)
{
Node* next = cur_node->next;
delete cur_node;
cur_node = next;
}
}
public:
void push(Job job)
{
Node* new_dummy = new Node();
// Use m_queue_back.exchange to establish a global order of all enqueues
//
// We use acq_rel because:
// - Release initialisation of `new_dummy`
// - Similarly acquire initialisation of `work_node`
// initialisation = what the Node constructor does
Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);
// now, work_node is unique for every producer (push)
// First write the job
work_node->job = job;
// Now "release" the job to consumers,
// and also append to LL at the same time
work_node->next.store(new_dummy, stdmo::release);
}
std::optional<Job> try_pop()
{
// Splice node from the front of queue, but only if it's not dummy
// Successfully splicing a node establishes global order of pops
Node* old_front = m_queue_front.load(stdmo::relaxed);
while(true)
{
// "Acquire" job if it exists
// Also use next pointer to know what to update m_queue_front to
Node* new_front = old_front->next.load(stdmo::acquire);
if(new_front == QUEUE_END)
{
// Observed dummy node, so we can abort as the queue is empty
// (or close to it)
return std::nullopt;
}
// for now, we use relaxed.
if(m_queue_front.compare_exchange_weak(old_front, //
new_front, //
stdmo::relaxed))
{
// Node now belongs to us
break;
}
// We couldn't update m_queue_front, so someone else successfully
// poped a node. We'll just loop.
}
Job job = old_front->job;
delete old_front;
return job;
}
};Let’s test it out with a configuration of 2 producers and 3 consumers!
We’ll have the 2 producers put in a bunch of jobs, and the consumers
will each read out a some of them and sum their ids in a
consumer-specific partial sum. main will wait for all
threads to complete, and then sum the partial sums.
#include <thread>
int main()
{
JobQueue1 queue;
auto producer1 = std::thread([&queue]() {
for(int i = 1; i <= 150000; i++)
queue.push(Job { i, i });
});
auto producer2 = std::thread([&queue]() {
for(int i = 150001; i <= 300000; i++)
queue.push(Job { i, i });
});
// 3 partial sums
int sum1 = 0;
int sum2 = 0;
int sum3 = 0;
auto consumer_fn = [&queue](int& sum) {
// Sum 100000 things
for(int i = 0; i < 100000; i++)
{
while(true)
{
std::optional<Job> job = queue.try_pop();
if(job)
{
sum += job->id;
break;
}
}
}
};
auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
auto consumer3 = std::thread(consumer_fn, std::ref(sum3));
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
consumer3.join();
printf("Sum of 1 to 300000 modulo integer limit: %d\n", sum1 + sum2 + sum3);
}Running it normally might give you an answer, and that answer might even be the correct answer. But if we run with ThreadSanitizer enabled, we get a lot of warnings, not limited to data races!
(Let this be a warning to always test with ThreadSanitizer!)
When using compare-and-swap loops, one thing that must be immediately addressed is the ABA problem.
We made the assumption that because we perform steps A+B+C atomically
with a CAS loop, we only succeed if the value of
m_queue_front was still old_front (the expected value) at the time of the write.
But if old_front can be set to another value, and then
set back to the same value again in time for the CAS operation to be
performed, the CAS would succeed! This is because previously freed
memory addresses may have been allocated to subquent calls to new Node().
This is where the name comes from: a value initially has value A, is set to B, and then back to A.
This is not always a problem. For example, our fetch_add
implementation above really does work, as we don’t rely on the property
that A actually never changed in the loop. Instead all we
really care about is that the new value is 5 higher than the old
value.
In our case, we DO care that the queue has not changed, since it’s
important that new_front really does refer to the second
node in the queue. We only have this guarantee if the queue really did
not change between the start and end of the loop.
Here’s an example of the ABA problem causing things to break.
First, a consumer tries to pop a node, and sees this state before entering the CAS loop:
Now, the OS is mean to our consumer and puts it to sleep. In the
meantime, the queue goes through many pushes and pops, allocating and
deallocating nodes as it goes. It still holds a pointer to (what it
thinks is) the old_front, address 0x20.
However, while it may point to a node, the identity of the
node is now different! Previously we had Job{5, 5} — but
now it’s Job{100, 100}, a completely different node that
just happened to have the same address.
On the other hand, new_front (the second node in the
queue when the consumer last checked) might have already been
deallocated, and now the pointer points to garbage:
Now, the consumer wakes up. It tries to perform the compare-and-swap,
and succeeds. This is because the pointer
value of old_front is the same as the pointer
value of m_queue_front — even though the
identity of the node has changed!
After succeeding at the CAS, we set the queue’s front to point at what used to be the second node, but now points at garbage:
Of course, now the queue is completely broken, plus our front pointer points at garbage memory!
There are several ways to solve the ABA problem, but is most commonly solved by including a “generation counter” alongside whatever data we’re performing the CAS on.
How do generation counters work? The idea is that we tie a unique number together with the pointer, so that even if the address is the same, the value (which is a combination of both the pointer and the counter) is different.
We use a 64-bit integer for our counter; this means that, for us to get a false positive on the compare-and-swap (leading to the ABA problem), the following conditions must hold:
m_queue_front was allocated at the same address
as our old oneThis is highly unlikely, mainly due to the second condition. So with very high probability, we can avoid the ABA problem by using generation counters.
In our case, we just add a generation count next to the front pointer, like so:
struct alignas(16) GenNodePtr
{
Node* node;
uintptr_t gen;
};
static_assert(std::atomic<GenNodePtr>::is_always_lock_free);
alignas(64) std::atomic<Node*> m_queue_back; // producer end
alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer endm_queue_front
Note that we don’t need to do this for the back pointer; since we are
not using a compare-and-swap loop for push, we are not
vulnerable to the ABA problem.
As for the rest of the queue, the implementation is similar to demo 1, just with a few changes to adapt to the generation-counted head pointer.
// demo2.cpp
#include <atomic>
#include <optional>
struct Job
{
int id;
int data;
};
// (Incorrect)
// Unbounded lock free queue with push and non-blocking try_pop
//
// Avoids ABA using generation counter
// This design is incorrect as it (still) suffers from UAF.
class JobQueue2
{
// alias for std::memory_order
using stdmo = std::memory_order;
// A node is a dummy node if its next pointer is set to QUEUE_END
// We use the next ptr to establish the synchronizes-with relationship
// next is in charge of "releasing" job
struct Node
{
std::atomic<Node*> next = QUEUE_END;
Job job;
};
static inline Node* const QUEUE_END = nullptr;
struct alignas(16) GenNodePtr
{
Node* node;
uintptr_t gen;
};
static_assert(std::atomic<GenNodePtr>::is_always_lock_free);
alignas(64) std::atomic<Node*> m_queue_back; // producer end
alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer end
public:
// Queue starts with a dummy node
JobQueue2() //
: m_queue_back(new Node())
, m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
{
}
~JobQueue2()
{
// Assumption: no other threads are accessing the job queue
Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
while(cur_node != QUEUE_END)
{
Node* next = cur_node->next;
delete cur_node;
cur_node = next;
}
}
public:
void push(Job job)
{
// same as the previous implementation
Node* new_dummy = new Node();
Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);
work_node->job = job;
work_node->next.store(new_dummy, stdmo::release);
}
std::optional<Job> try_pop()
{
// Splice node from the front of queue, but only if it's not dummy
// Successfully splicing a node establishes global order of pops
// Relaxed ordering because we don't need to synchronize with other
// consumers, we need to synchronize with the producer that made the
// node, which is done via the `next` ptr.
GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
while(true)
{
// this part is similar -- just need an additional `.node` get the
// node out from the GenNodePtr
Node* old_front_next = old_front.node->next.load(stdmo::acquire);
if(old_front_next == QUEUE_END)
return std::nullopt;
// note that the generation is strictly increasing
GenNodePtr new_front { old_front_next, old_front.gen + 1 };
// this part is also similar, except we CAS with the GenNodePtr instead
// of just a simple Node*.
if(m_queue_front.compare_exchange_weak(old_front, new_front, stdmo::relaxed))
{
// Node now belongs to us
break;
}
}
Job job = old_front.node->job;
delete old_front.node;
return job;
}
};
#include <thread>
int main()
{
JobQueue2 queue;
auto producer1 = std::thread([&queue]() {
for(int i = 1; i <= 150000; i++)
queue.push(Job { i, i });
});
auto producer2 = std::thread([&queue]() {
for(int i = 150001; i <= 300000; i++)
queue.push(Job { i, i });
});
// 3 partial sums
int sum1 = 0;
int sum2 = 0;
int sum3 = 0;
auto consumer_fn = [&queue](int& sum) {
// Sum 100000 things
for(int i = 0; i < 100000; i++)
{
while(true)
{
std::optional<Job> job = queue.try_pop();
if(job)
{
sum += job->id;
break;
}
}
}
};
auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
auto consumer3 = std::thread(consumer_fn, std::ref(sum3));
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
consumer3.join();
printf("Sum of 1 to 300000 modulo integer limit: %d\n", sum1 + sum2 + sum3);
}
Let’s take a look at the try_pop function to see how the
generation counters are used:
GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
while(true)
{
// this part is similar -- just need an additional `.node` get the
// node out from the GenNodePtr
Node* old_front_next = old_front.node->next.load(stdmo::acquire);
if(old_front_next == QUEUE_END)
return std::nullopt;
// note that the generation is strictly increasing
GenNodePtr new_front { old_front_next, old_front.gen + 1 };
// this part is also similar, except we CAS with the GenNodePtr instead
// of just a simple Node*.
if(m_queue_front.compare_exchange_weak(old_front, new_front, stdmo::relaxed))
{
// Node now belongs to us
break;
}
}try_pop method
We’re now loading a GenNodePtr when we get the old
front, and we’re also using it to do the compare-and-swap. The most
important thing here is that the generation of our
new_front (if we succeeded at the CAS) is “newer” than the
old front.
As explained above, the use of generation counters means that we shouldn’t get an ABA problem any more. So now, we should be good right? Let’s run it with ThreadSanitizer…
$ ./demo2.tsan
WARNING: ThreadSanitizer: heap-use-after-free (pid=1889172)
Atomic read of size 8 at 0x7b0400002800 by thread T4:
<...>
Previous write of size 8 at 0x7b0400002800 by thread T3:
<...>
Ouch. We still have races, because we still have another problem!
GenNodePtr
How can we be sure that our GenNodePtr is actually
lock-free? The keen-eyed among you might have already noticed that
std::atomic<T> doesn’t actually make any guarantees
about being lock-free, only that it is atomic.
There’s two ways we can tell: the is_lock_free method on
an instance, and the static data member
is_always_lock_free. The reason there’s two is because a
given object might only be atomic if aligned suitably, and the alignment
can be runtime-dependent.
If we want to know whether a type is always lock free, then
we can use std::atomic<T>::is_always_lock_free
— this is true if the type is always lock-free, regardless of its
alignment. We used a static_assert on
our GenNodePtr to make sure.
16-byte atomic compare-and-swap is done with the
cmpxchg16b instruction on x86_64. This isn’t supported by
some very old x86_64 CPUs, so note that we had to pass
-march=native to the compiler to ensure that it uses this
instruction; otherwise, our static assertion will fail.
While we won’t succeed at performing a CAS when other consumers have
changed the queue, have changed the queue, we’re still dereferencing a
pointer from a possibly stale value of m_queue_front (which
we stored as old_front).
This can cause data races as it’s possible a new node is allocated
again on the same address (regardless of generation!), and now we can
have the constructor of std::atomic race with a use of the
same object with .load().
Solutions to this problem generally fall under a few classes:
try_pop, and when the last one leaves, we free all of them
at once.shared_ptr) to
know when there are no more remaining references to a particular
object.We’ll go with a variant of solution (2). Rather than trying to delete
when the last try_pop leaves, which can be very rare, we
simply won’t try to delete nodes at all while the queue is alive, but
store them somewhere so that we can delete them all at once when the
queue is being destructed.
However, this would still allow a memory leak to occur, and our memory usage can keep increasing as the queue is used more. To avoid this, we’ll reuse nodes that were deleted and put them back in the queue when we need new nodes.
This essentially functions as a “free list” of nodes that we recycle for future use. We only allocate new nodes from the heap when our recycling centre has been exhausted. Note that this means that the peak usage of our queue can still be quite high, and if the queue is very long for only a short time, we’ll keep those recycled nodes around doing nothing.
However, this is still a relatively simple solution, so we’ll stick with it.
shared_ptr?
The keen-eyed among you might have noticed that the C++ standard
library actually has a specialisation of std::atomic for
shared_ptr. So, why don’t we use it?
Well, the problem is that it’s not lock-free. None of the 3 major
implementations of the STL (libc++, libstdc++, and MSVC’s STL) have a
lock-free implementation of this — it just uses mutexes under the hood.
Note that nowhere in the specification does it say that
std::atomic must actually be lock-free!
(Only std::atomic_flag is guaranteed to be
lock-free).
Since we’re interested in building a specifically lock-free queue, we opted not to use this. There’s a few CppCon talks about atomic shared pointers you can watch, if you’re interested:
As environmentally conscious students, we should recycle whenever possible. Since we don’t care about the order that recycled nodes are used, we can just use a stack for it, rather than a queue. However, since we still want the queue overall to be lock-free, our recycling centre also needs to be lock-free!
Yes, our concurrent queue implementation has a concurrent stack implementation hidden inside (:
First, for the recycling node stack, we just need a single pointer to keep track of the top of the stack:
// we use these as sentinel values.
static inline Node* const QUEUE_END = nullptr;
static inline Node* const STACK_END = QUEUE_END + 1;
struct alignas(16) GenNodePtr
{
Node* node;
uintptr_t gen;
};
static_assert(std::atomic<GenNodePtr>::is_always_lock_free);
alignas(64) std::atomic<Node*> m_queue_back; // producer end
alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer end
alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stackAs you can see, most of it is the same. We added a new sentinel value to use as the “empty stack” value. Before we forget, we still need to clean up the stack in the queue’s destructor, as promised. It’s relatively simple:
// we need to clean up the recycled nodes as well
cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
while(cur_node != STACK_END)
{
Node* next = cur_node->next;
delete cur_node;
cur_node = next;
}Now for the fun part, which is allocating a new node (or getting one from our recycling stack):
// either get a node from the recycling stack if we have some,
// or allocate a new one if we don't.
Node* get_recycled_node_or_allocate_new()
{
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
// if we have no more recycled nodes, return a newly-allocated one.
if(old_stack_top.node == STACK_END)
return new Node();
Node* cur_stack_next = old_stack_top.node->next.load(stdmo::relaxed);
GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };
if(m_recycled_stack_top.compare_exchange_weak( //
old_stack_top, //
new_stack_top, //
stdmo::relaxed))
{
// successfully got a node from the recycling centre
return old_stack_top.node;
}
}
}Next, we need a way to put things into this recycling stack when we pop nodes from our queue. The implementation is very similar to our classic CAS-loop-with-generation-counter pattern:
// Put node in recycling centre
void add_node_to_recycling_stack(Node* node)
{
// Standard CAS loop with generation counter to avoid ABA.
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
node->next.store(old_stack_top.node, stdmo::relaxed);
GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };
if(m_recycled_stack_top.compare_exchange_weak( //
old_stack_top, //
new_stack_top, //
stdmo::relaxed))
{
break;
}
}
}All that’s left is to use these functions when we create and delete
nodes (instead of calling new and delete directly).
That’s fairly straightforward:
void push(Job job)
{
Node* new_dummy = get_recycled_node_or_allocate_new();
new_dummy->next.store(QUEUE_END, stdmo::relaxed);
// the rest is the same...
}Note that for push, we have to explicitly reset the
next pointer to our sentinel, because it might have been
recycled and have stale data inside.
std::optional<Job> try_pop()
{
// most of it is the same...
// only this part changes
Job job = old_front.node->job;
add_node_to_recycling_stack(old_front.node);
return job;
}A question you might want to ask is: can we still get a use-after-free situation?
No, we can’t. By definition, we never really free nodes, only add them to the recycling centre. Thus, we will never end up dereferencing a deallocated node, which was our UAF problem in the first place.
We might end up checking the ->next of a node that’s
already been recycled, but that’s fine — it simply won’t match with what
we see in the queue, and we’ll retry our CAS loop.
In our recycling stack implementation, we’ve used
memory_order_relaxed, since the nodes in the stack don’t
contain any data other than the next pointer. We can
imagine that threads only need to observe
m_recycling_stack_top, and that atomicity gives us the
guarantees we need.
// demo3.cpp
#include <atomic>
#include <optional>
struct Job
{
int id;
int data;
};
// (has subtle bug)
// Unbounded lock free queue with push and non-blocking try_pop
//
// Avoids ABA using generation counter
// Avoids UAF by reusing nodes instead of deallocating them
class JobQueue3
{
// alias for std::memory_order
using stdmo = std::memory_order;
// A node is a dummy node if its next pointer is set to QUEUE_END
// We use the next ptr to establish the synchronizes-with relationship
// next is in charge of "releasing" job
struct Node
{
std::atomic<Node*> next = QUEUE_END;
Job job;
};
// we use these as sentinel values.
static inline Node* const QUEUE_END = nullptr;
static inline Node* const STACK_END = QUEUE_END + 1;
struct alignas(16) GenNodePtr
{
Node* node;
uintptr_t gen;
};
static_assert(std::atomic<GenNodePtr>::is_always_lock_free);
alignas(64) std::atomic<Node*> m_queue_back; // producer end
alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer end
alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stack
public:
// Queue starts with a dummy node
JobQueue3() //
: m_queue_back(new Node())
, m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
, m_recycled_stack_top(GenNodePtr { STACK_END, 1 })
{
}
~JobQueue3()
{
// Assumption: no other threads are accessing the job queue
Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
while(cur_node != QUEUE_END)
{
Node* next = cur_node->next;
delete cur_node;
cur_node = next;
}
// we need to clean up the recycled nodes as well
cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
while(cur_node != STACK_END)
{
Node* next = cur_node->next;
delete cur_node;
cur_node = next;
}
}
// either get a node from the recycling stack if we have some,
// or allocate a new one if we don't.
Node* get_recycled_node_or_allocate_new()
{
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
// if we have no more recycled nodes, return a newly-allocated one.
if(old_stack_top.node == STACK_END)
return new Node();
Node* cur_stack_next = old_stack_top.node->next.load(stdmo::relaxed);
GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };
if(m_recycled_stack_top.compare_exchange_weak( //
old_stack_top, //
new_stack_top, //
stdmo::relaxed))
{
// successfully got a node from the recycling centre
return old_stack_top.node;
}
}
}
// Put node in recycling centre
void add_node_to_recycling_stack(Node* node)
{
// Standard CAS loop with generation counter to avoid ABA.
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
node->next.store(old_stack_top.node, stdmo::relaxed);
GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };
if(m_recycled_stack_top.compare_exchange_weak( //
old_stack_top, //
new_stack_top, //
stdmo::relaxed))
{
break;
}
}
}
public:
void push(Job job)
{
Node* new_dummy = get_recycled_node_or_allocate_new();
new_dummy->next.store(QUEUE_END, stdmo::relaxed);
// the rest is the same...
Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);
work_node->job = job;
work_node->next.store(new_dummy, stdmo::release);
}
std::optional<Job> try_pop()
{
// most of it is the same...
// Splice node from the front of queue, but only if it's not dummy
// Successfully splicing a node establishes global order of pops
// Relaxed ordering because we don't need to synchronize with other
// consumers, we need to synchronize with the producer that made the
// node, which is done via the `next` ptr.
GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
while(true)
{
// this part is similar -- just need an additional `.node` get the
// node out from the GenNodePtr
Node* old_front_next = old_front.node->next.load(stdmo::acquire);
if(old_front_next == QUEUE_END)
return std::nullopt;
// note that the generation is strictly increasing
GenNodePtr new_front { old_front_next, old_front.gen + 1 };
// this part is also similar, except we CAS with the GenNodePtr instead
// of just a simple Node*.
if(m_queue_front.compare_exchange_weak(old_front, new_front, stdmo::relaxed))
{
// Node now belongs to us
break;
}
}
// only this part changes
Job job = old_front.node->job;
add_node_to_recycling_stack(old_front.node);
return job;
}
};
#include <thread>
int main()
{
JobQueue3 queue;
auto producer1 = std::thread([&queue]() {
for(int i = 1; i <= 150000; i++)
queue.push(Job { i, i });
});
auto producer2 = std::thread([&queue]() {
for(int i = 150001; i <= 300000; i++)
queue.push(Job { i, i });
});
// 3 partial sums
int sum1 = 0;
int sum2 = 0;
int sum3 = 0;
auto consumer_fn = [&queue](int& sum) {
// Sum 100000 things
for(int i = 0; i < 100000; i++)
{
while(true)
{
std::optional<Job> job = queue.try_pop();
if(job)
{
sum += job->id;
break;
}
}
}
};
auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
auto consumer3 = std::thread(consumer_fn, std::ref(sum3));
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
consumer3.join();
printf("Sum of 1 to 300000 modulo integer limit: %d\n", sum1 + sum2 + sum3);
}
Unfortunately, our environmentally-friendly recycling centre has a data race:
==================
WARNING: ThreadSanitizer: data race (pid=3978284)
Write of size 8 at 0x7b0400000008 by thread T1:
#0 JobQueue11B::push(Job) demo3.cpp:134 (demo3.tsan+0xe163d)
<...>
Previous read of size 8 at 0x7b0400000008 by thread T3:
#0 JobQueue11B::try_pop() demo31.cpp:171 (demo3.tsan+0xe0ecd)
<...>
That corresponds to these two pieces of code:
work_node->job = job;Job job = old_front.node->job;Take careful note of the ordering of the race here. According to ThreadSanitizer, we tried to read the job first, and then perform the write. While it’s true that when a data race occurs, it’s hard to say which one happened “first”, the order that ThreadSanitizer presents is useful to keep in mind as it’s usually the more intuitive ordering of events.
In this ordering, it would suggest that the read in
try_pop can potentially see a FUTURE value,
written by a subsequent push. This is now possible because
of node recycling!
To make this race more obvious, let’s draw out our favourite synchronisation diagram:
As we can indeed see, there’s no synchronisation between loading the job in T1 and storing it in T3.
To resolve this, we can make T1 synchronise-with T2 during the
load/store of node->next. The data race happens when the
node that T1 adds to the recycling stack is the one that T2 takes out.
Thus, if T2 does indeed take out node X, it would have read the value
stored by T1. By using release-acquire here, we would have established a
synchronises-with relationship:
With the transitive nature of happens-before, we now have a correct synchronisation between T1 and T3. To do this, we just need to change a few lines of code:
Node* get_recycled_node_or_allocate_new()
{
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
if(old_stack_top.node == STACK_END)
{
return new Node();
}
// here: use **acquire**. synchronise with the release-store of
// node->next in `add_node_to_recycling_stack`
Node* cur_stack_next = old_stack_top.node->next.load(stdmo::acquire);
// the rest is the same...
}
}void add_node_to_recycling_stack(Node* node)
{
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
// here: use **release**. synchronise with the acquire-load of
// node->next in `get_recycled_node_or_allocate_new`
node->next.store(old_stack_top.node, stdmo::release);
// the rest is the same...
}
}If we run our fixed implementation, we are finally free of ThreadSanitizer errors!
// demo4.cpp
#include <atomic>
#include <optional>
struct Job
{
int id;
int data;
};
// (is [not quite] correct)
// Unbounded lock free queue with push and non-blocking try_pop
//
// Avoids ABA using generation counter
// Avoids UAF by reusing nodes instead of deallocating them
class JobQueue4
{
// alias for std::memory_order
using stdmo = std::memory_order;
// A node is a dummy node if its next pointer is set to QUEUE_END
// We use the next ptr to establish the synchronizes-with relationship
// next is in charge of "releasing" job
struct Node
{
std::atomic<Node*> next = QUEUE_END;
Job job;
};
// we use these as sentinel values.
static inline Node* const QUEUE_END = nullptr;
static inline Node* const STACK_END = QUEUE_END + 1;
struct alignas(16) GenNodePtr
{
Node* node;
uintptr_t gen;
};
static_assert(std::atomic<GenNodePtr>::is_always_lock_free);
alignas(64) std::atomic<Node*> m_queue_back; // producer end
alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer end
alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stack
public:
// Queue starts with a dummy node
JobQueue4() //
: m_queue_back(new Node())
, m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
, m_recycled_stack_top(GenNodePtr { STACK_END, 1 })
{
}
~JobQueue4()
{
// Assumption: no other threads are accessing the job queue
Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
while(cur_node != QUEUE_END)
{
Node* next = cur_node->next;
delete cur_node;
cur_node = next;
}
// we need to clean up the recycled nodes as well
cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
while(cur_node != STACK_END)
{
Node* next = cur_node->next;
delete cur_node;
cur_node = next;
}
}
// either get a node from the recycling stack if we have some,
// or allocate a new one if we don't.
Node* get_recycled_node_or_allocate_new()
{
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
if(old_stack_top.node == STACK_END)
{
return new Node();
}
// here: use **acquire**. synchronise with the release-store of
// node->next in `add_node_to_recycling_stack`
Node* cur_stack_next = old_stack_top.node->next.load(stdmo::acquire);
// the rest is the same...
GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };
if(m_recycled_stack_top.compare_exchange_weak( //
old_stack_top, //
new_stack_top, //
stdmo::relaxed))
{
// successfully got a node from the recycling centre
return old_stack_top.node;
}
}
}
// Put node in recycling centre
void add_node_to_recycling_stack(Node* node)
{
GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
while(true)
{
// here: use **release**. synchronise with the acquire-load of
// node->next in `get_recycled_node_or_allocate_new`
node->next.store(old_stack_top.node, stdmo::release);
// the rest is the same...
GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };
if(m_recycled_stack_top.compare_exchange_weak( //
old_stack_top, //
new_stack_top, //
stdmo::relaxed))
{
break;
}
}
}
public:
void push(Job job)
{
Node* new_dummy = get_recycled_node_or_allocate_new();
new_dummy->next.store(QUEUE_END, stdmo::relaxed);
Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);
work_node->job = job;
work_node->next.store(new_dummy, stdmo::release);
}
std::optional<Job> try_pop()
{
// most of it is the same...
// Splice node from the front of queue, but only if it's not dummy
// Successfully splicing a node establishes global order of pops
// Relaxed ordering because we don't need to synchronize with other
// consumers, we need to synchronize with the producer that made the
// node, which is done via the `next` ptr.
GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
while(true)
{
// this part is similar -- just need an additional `.node` get the
// node out from the GenNodePtr
Node* old_front_next = old_front.node->next.load(stdmo::acquire);
if(old_front_next == QUEUE_END)
return std::nullopt;
// note that the generation is strictly increasing
GenNodePtr new_front { old_front_next, old_front.gen + 1 };
// this part is also similar, except we CAS with the GenNodePtr instead
// of just a simple Node*.
if(m_queue_front.compare_exchange_weak(old_front, //
new_front, stdmo::relaxed))
{
// Node now belongs to us
break;
}
}
Job job = old_front.node->job;
add_node_to_recycling_stack(old_front.node);
return job;
}
};
#include <thread>
#include <barrier>
int main()
{
JobQueue4 queue;
auto barrier = std::barrier(3);
auto producer1 = std::thread([&queue, &barrier]() {
barrier.arrive_and_wait();
for(int i = 1; i <= 200000; i++)
{
queue.push(Job { i, i });
std::this_thread::yield();
}
});
// 3 partial sums
int sum1 = 0;
int sum2 = 0;
auto consumer_fn = [&queue, &barrier](int& sum) {
barrier.arrive_and_wait();
for(int i = 0; i < 100000; i++)
{
while(true)
{
std::optional<Job> job = queue.try_pop();
if(job)
{
sum += job->id;
break;
}
}
}
};
auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
producer1.join();
consumer1.join();
consumer2.join();
printf("magic number: %d\n", sum1 + sum2);
}
Just kidding! If we run with the modified test code below that forces the producer thread to yield after pushing every item (allowing consumers to “overtake” it):
#include <thread>
#include <barrier>
int main()
{
JobQueue4 queue;
auto barrier = std::barrier(3);
auto producer1 = std::thread([&queue, &barrier]() {
barrier.arrive_and_wait();
for(int i = 1; i <= 200000; i++)
{
queue.push(Job { i, i });
std::this_thread::yield();
}
});
// 3 partial sums
int sum1 = 0;
int sum2 = 0;
auto consumer_fn = [&queue, &barrier](int& sum) {
barrier.arrive_and_wait();
for(int i = 0; i < 100000; i++)
{
while(true)
{
std::optional<Job> job = queue.try_pop();
if(job)
{
sum += job->id;
break;
}
}
}
};
auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
producer1.join();
consumer1.join();
consumer2.join();
printf("magic number: %d\n", sum1 + sum2);
}Then we actually get another race! If we look at the line numbers that ThreadSanitizer complains about, we get a race.
This is what TSan claims is the “writer”:
return new Node();And this is what it claims is the “reader”:
Node* old_front_next = old_front.node->next.load(stdmo::acquire);What’s the race here? If we draw a synchronisation diagram, we can see the following:
There is indeed no synchronise-with relationship between T1 and T3. This means that we could potentially get a race between the reader on the right and the writer on the left. Even though this might seem like it violates causality (and/or involves time travel), it is still a data race.
In order to fix this, we need to find a way to synchronise T2 and T3,
so that T1 will synchronise-with T3 as well. We can do this by making
the compare-exchange on the front-of-queue be acquire-release
instead, and the loading of the old_front be an
acquire load:
This corresponds to the following changes in code:
std::optional<Job> try_pop()
{
// most of it is the same...
// but we changed this load to acquire
GenNodePtr old_front = m_queue_front.load(stdmo::acquire);
while(true)
{
Node* old_front_next = old_front.node->next.load(stdmo::acquire);
if(old_front_next == QUEUE_END)
return std::nullopt;
GenNodePtr new_front { old_front_next, old_front.gen + 1 };
// and we change this CAS to be acquire-release
if(m_queue_front.compare_exchange_weak(old_front, //
new_front, stdmo::acq_rel))
{
break;
}
}
Job job = old_front.node->job;
add_node_to_recycling_stack(old_front.node);
return job;
}Now, if we run it… we should finally, actually, hopefully, have a correct, working implementation of a lock-free concurrent queue.
Now that we have seen both a fine-grained-lock queue and a lock-free queue, how else can we learn which queue a situation calls for, if not through benchmarks!
The specific form of benchmarking we will be looking at is microbenchmarking; microbenchmarks track the performance of a single well-defined task, and is most useful for CPU work that is run many times (also known as hot code paths).
In this section, we will be benchmarking 3 concurrent queue implementations:
// locked-coarse-queue.cpp
#include <queue>
#include <mutex>
struct Job
{
int id;
int data;
};
// Unbounded queue with push and non-blocking try_pop
class CoarseLockedJobQueue
{
std::queue<Job> jobs;
std::mutex mut;
public:
CoarseLockedJobQueue() { }
void push(Job job)
{
std::unique_lock lock { mut };
jobs.push(job);
}
std::optional<Job> try_pop()
{
std::unique_lock lock { mut };
if(jobs.empty())
{
return std::nullopt;
}
else
{
Job job = jobs.front();
jobs.pop();
return job;
}
}
};// locked-fine-queue.cpp
#include <mutex>
struct Job
{
int id;
int data;
};
// Unbounded queue with push and non-blocking pop
// Use fine grained locking to decouple producers and consumers, though we
// no longer have blocking pop
class FineLockedJobQueue
{
// A node can be a dummy node (contains no jobs and next == nullptr) or
// it can be a regular node (contains a job and next != nullptr)
// The last node in the queue (node at producer end) is always the dummy
// node, all other nodes are regular nodes
struct Node
{
std::mutex mut {};
Node* next = nullptr;
Job job {};
};
alignas(64) std::mutex mut_back;
Node* jobs_back; // producer end
alignas(64) std::mutex mut_front;
Node* jobs_front; // consumer end
public:
// Queue starts with a dummy node
FineLockedJobQueue() //
: mut_back()
, jobs_back(new Node())
, mut_front()
, jobs_front(jobs_back)
{
}
~FineLockedJobQueue()
{
// Assumption: no other threads are accessing the job queue
while(jobs_front != nullptr)
{
Node* next = jobs_front->next;
delete jobs_front;
jobs_front = next;
}
}
void push(Job job)
{
// Make a new dummy node
Node* new_node = new Node {};
std::unique_lock lock { mut_back };
// Turn old dummy node into regular node
std::unique_lock lock_node { jobs_back->mut };
jobs_back->job = job;
jobs_back->next = new_node;
jobs_back = new_node;
}
std::optional<Job> try_pop()
{
Node* old_node;
{
std::unique_lock lock { mut_front };
old_node = jobs_front;
std::unique_lock lock_node { old_node->mut };
if(old_node->next == nullptr)
{
// Node was dummy, so the queue is empty
return std::nullopt;
}
// Node was not dummy, delete node and return job
jobs_front = jobs_front->next;
}
Job job = old_node->job;
delete old_node;
return job;
}
};When benchmarking a piece of code, we first decide the metric by which we evaluate our code. Some common examples include CPU cycles, MFLOPS, and real time (wall-clock-time taken from the start to the end of a program).
To understand how each queue scales under contention with a different number of producers and consumers, we will measuring the real time taken for a few different setups for each queue:
In this setup, we use a std::barrier to
ensure all producers and consumers threads have reached the start of the
benchmark loop before starting the timer and commencing execution. When
all threads have finished executing, we end the timer. Within the loop,
we busily spin on a variable in between enqueues/dequeues to simulate
work done on the threads in an effort to emulate real world usage.
Try running the code below on Fsmbolt (or locally if possible!) and observe how each queue performs in different setups. Note that the execution time is about 30 seconds so please be patient.
template <typename JobQueueImpl,
std::ptrdiff_t ProducerCount,
std::ptrdiff_t ConsumerCount>
sc::microseconds benchmark_prod_con()
{
static_assert( //
((static_cast<int>(ProducerCount) * loop_iters) % ConsumerCount) == 0,
"Invalid total thread count");
JobQueueImpl queue;
// we use the main thread to keep track of the time
auto thread_count = ProducerCount + ConsumerCount + 1;
auto barrier = std::barrier(thread_count);
auto producer_func = [&queue, &barrier](int id) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(5, 15);
barrier.arrive_and_wait();
for(int i = 0; i < loop_iters; ++i)
{
queue.push(Job { id, i });
#ifndef HIGH_CONTENTION
int work_cycles = distrib(gen);
while(--work_cycles)
__asm__("nop");
#endif
}
};
auto consumer_iter = (ProducerCount * loop_iters) / ConsumerCount;
auto consumer_func = [&queue, &barrier, consumer_iter]() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(5, 15);
barrier.arrive_and_wait();
for(int i = 0; i < consumer_iter; ++i)
{
auto job = queue.try_pop();
while(!job)
{
__asm__("pause");
job = queue.try_pop();
}
do_something(*job);
;
#ifndef HIGH_CONTENTION
int work_cycles = distrib(gen);
while(--work_cycles)
__asm__("nop");
#endif
}
};
std::vector<std::thread> producers;
for(size_t i = 1; i <= ProducerCount; ++i)
producers.emplace_back(producer_func, i);
std::vector<std::thread> consumers;
for(size_t i = 1; i <= ConsumerCount; ++i)
consumers.emplace_back(consumer_func);
auto start = sc::high_resolution_clock::now();
barrier.arrive_and_wait();
for(size_t i = 0; i < ProducerCount; ++i)
producers[i].join();
for(size_t i = 0; i < ConsumerCount; ++i)
consumers[i].join();
auto stop = sc::high_resolution_clock::now();
return sc::duration_cast<sc::microseconds>(stop - start);
}Running with simulated work between consecutive enqueue/dequeues SPSC benchmarks: SPSC CoarseLockedJobQueue took: 260130µs SPSC FineLockedJobQueue took: 233000µs SPSC LockFreeJobQueue took: 266310µs
SPMC benchmarks: 1P2C CoarseLockedJobQueue took: 251312µs 1P2C FineLockedJobQueue took: 367206µs 1P2C LockFreeJobQueue took: 296053µs
1P4C CoarseLockedJobQueue took: 726801µs 1P4C FineLockedJobQueue took: 365763µs 1P4C LockFreeJobQueue took: 335175µs
MPSC benchmarks: 2P1C CoarseLockedJobQueue took: 324418µs 2P1C FineLockedJobQueue took: 958496µs 2P1C LockFreeJobQueue took: 429071µs
4P1C CoarseLockedJobQueue took: 867296µs 4P1C FineLockedJobQueue took: 2915416µs 4P1C LockFreeJobQueue took: 771916µs
MPMC benchmarks: 2P2C CoarseLockedJobQueue took: 484619µs 2P2C FineLockedJobQueue took: 698458µs 2P2C LockFreeJobQueue took: 604591µs
4P4C CoarseLockedJobQueue took: 1577709µs 4P4C FineLockedJobQueue took: 1466190µs 4P4C LockFreeJobQueue took: 1175500µs
8P8C CoarseLockedJobQueue took: 3701999µs 8P8C FineLockedJobQueue took: 3390677µs 8P8C LockFreeJobQueue took: 3067856µs-DHIGH_CONTENTION. This removes the simulated work
between enqueue/dequeues and increase the contention on both ends of the
queue. Observe how the performance of the lock free queue degrades.
In real world use cases, the access patterns by different threads may differ. In the godbolt links above, we have included another another benchmark where each thread:
Add the compilation flag -DED_PAIR.
© CS3211 Teaching Team, All Rights Reserved
^